package com.ftl.search.listeners;

import com.ftl.search.common.utils.JacksonSerializeUtils;
import com.ftl.search.model.dto.CUDParamDTO;
import com.ftl.search.service.impl.CommonSearchService;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

@Service
public class ModifyDataListener {

    @Resource
    private CommonSearchService commonSearchService;

    @Resource
    private KafkaTemplate kafkaTemplate;

    @KafkaListener(topics = {"SEARCH_MODIFY_DATA_TOPIC"}, autoStartup = "${spring.kafka.startListener}")
    public void process(ConsumerRecord<String, String> record, Acknowledgment ack) {
        try {
            CUDParamDTO dto = JacksonSerializeUtils.deserialization(record.value(), CUDParamDTO.class);
            if (commonSearchService.checkRecord(dto)) {
                // 处理数据
                commonSearchService.modifyWithES(dto);
            }
            // 手动提交offset
            ack.acknowledge();
        } catch (Exception e) {
            e.printStackTrace();
            kafkaTemplate.send("SEARCH_MODIFY_DATA_TOPIC.DLT", record.value());
        }
    }
}
